package com.dongzili.debezium.config;

import io.debezium.config.Configuration;
import io.debezium.connector.mysql.MySqlConnector;
import io.debezium.engine.ChangeEvent;
import io.debezium.engine.DebeziumEngine;
import io.debezium.engine.format.Json;
import io.debezium.relational.history.FileDatabaseHistory;
import org.apache.kafka.connect.storage.FileOffsetBackingStore;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import java.io.IOException;
import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

@Component
public class DebeziumListener {

    private ExecutorService executor;
    private DebeziumEngine<ChangeEvent<String, String>> engine;

    @PostConstruct
    public void start() {
        Properties props = genProps();

        engine = DebeziumEngine.create(Json.class)
                .using(props)
                .notifying(record -> {
                    // record中会有操作的类型（增、删、改）和具体的数据
                    // key是主键
//                            System.out.println("record = " + record);
                    System.out.println("record.destination() = " + record.destination());
                    System.out.println("record.key() = " + record.key());
                    System.out.println("record.value() = " + record.value());


                })
                .using((success, message, error) -> {
                    // 强烈建议加上此部分的回调代码，方便查看错误信息

                    if (!success && error != null) {
                        // 报错回调
                        System.out.println("----------error------");
                        System.out.println(message);
                        System.out.println(error);
                        System.out.println(success);

                    }
                }).build();

        executor = Executors.newSingleThreadExecutor();
        executor.execute(engine);
    }

    @PreDestroy
    public void stop() throws IOException {
        if (engine != null) engine.close();//stop();
        if (executor != null) executor.shutdown();
    }

    private static Properties genProps() {
        // 配置
        Properties props = new Properties();
        // 在maven处引入其他数据库的连接器，例如debezium-connector-postgres，再修改此处的connector.class，即可使用其他数据库的CDC
        props.setProperty("connector.class", MySqlConnector.class.getCanonicalName());
        props.setProperty("database.server.name", "my_server_01"); // 可以任意修改
        props.setProperty("database.hostname", "localhost"); // IP
        props.setProperty("database.port", String.valueOf(3306)); // 端口
        props.setProperty("database.user", "root"); // 用户
        props.setProperty("database.password", "root"); // 密码
        props.setProperty("database.serverTimezone", "UTC"); // 时区
        // 下面两个是数据库和表，注意只能选择一种:
        // 1. 使用database.whitelist，只设置数据库（会通知全库的CDC信息）
        // 2. 使用table.whitelist，设置库名和表名（会通知单个库的单个表的CDC信息）
//        props.setProperty("database.whitelist", "wlxtdb");
        props.setProperty("table.whitelist", "wlxtdb.s_browse_log"); // 库.表名

        props.setProperty("name", "engine");
        props.setProperty("key.converter.schemas.enable", "false");
        props.setProperty("value.converter.schemas.enable", "false");
        props.setProperty("include.schema.changes", "false");
        props.setProperty("tombstones.on.delete", "false");
        props.setProperty("database.history", FileDatabaseHistory.class.getCanonicalName());
        props.setProperty("database.history.store.only.monitored.tables.ddl", "true");
        props.setProperty("database.history.file.filename", "./dbhistory.dat");
        props.setProperty("database.history.instance.name", UUID.randomUUID().toString());
        props.setProperty("database.history.skip.unparseable.ddl", "true");
        // 偏移量持久化配置
        props.setProperty("offset.storage", FileOffsetBackingStore.class.getCanonicalName());
        props.setProperty("offset.storage.file.filename", "./offsets.dat");
        props.setProperty("offset.flush.interval.ms", "6000");
        return props;
    }
}